import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../common/test_page.dart';
import '../utils.dart';

String _toEventOdd(int value) => value == 0 ? 'even' : 'odd';

class GroupByTestPage extends TestPage {
  GroupByTestPage(super.title) {
    test('Rx.groupBy', () async {
      expect(
          Stream.fromIterable([1, 2, 3, 4]).groupBy((value) => value),
      );

      expect(
          Stream.fromIterable([1, 2, 3, 4])
              .groupBy((value) => value, durationSelector: (_) => Rx.never()),
      );
    });

    test('Rx.groupBy.correctlyEmitsGroupEvents', () async {
      expect(
          Stream.fromIterable([1, 2, 3, 4])
              .groupBy((value) => _toEventOdd(value % 2))
              .flatMap((stream) => stream.map((event) => {stream.key: event})),
      );

      expect(
          Stream.fromIterable([1, 2, 3, 4])
              .groupBy(
                (value) => _toEventOdd(value % 2),
            durationSelector: (_) =>
                Stream.periodic(const Duration(seconds: 1)),
          )
              .flatMap((stream) => stream.map((event) => {stream.key: event})),
      );
    });

    test('Rx.groupBy.correctlyEmitsGroupEvents.alternate', () async {
      expect(
          Stream.fromIterable([1, 2, 3, 4])
              .groupBy((value) => _toEventOdd(value % 2))
              .map((stream) async => stream.fold(
              {stream.key: <int>[]},
                  (Map<String, List<int>> previous, element) =>
              previous..[stream.key]?.add(element))),
      );

      expect(
          Stream.fromIterable([1, 2, 3, 4])
              .groupBy(
                (value) => _toEventOdd(value % 2),
            durationSelector: (_) =>
                Stream.periodic(const Duration(seconds: 1)),
          )
              .map((stream) async => stream.fold(
              {stream.key: <int>[]},
                  (Map<String, List<int>> previous, element) =>
              previous..[stream.key]?.add(element))),
      );
    });

    test('Rx.groupBy.emittedStreamCallOnDone', () async {
      expect(
          Stream.fromIterable([1, 2, 3, 4])
              .groupBy((value) => value)
              .map((stream) async => stream.drain('done')),
      );

      expect(
          Stream.fromIterable([1, 2, 3, 4])
              .groupBy((value) => value, durationSelector: (_) => Rx.never())
              .map((stream) async => stream.drain('done')),
      );
    });

    test('Rx.groupBy.asBroadcastStream', () async {
      {
        final stream = Stream.fromIterable([1, 2, 3, 4])
            .asBroadcastStream()
            .groupBy((value) => value);

        stream.listen(null);
        stream.listen(null);
        expect(true);
      }

      {
        final stream =
        Stream.fromIterable([1, 2, 3, 4]).asBroadcastStream().groupBy(
              (value) => value,
          durationSelector: (_) =>
              Stream.periodic(const Duration(seconds: 2)),
        );

        stream.listen(null);
        stream.listen(null);

        expect(true);
      }
    });

    test('Rx.groupBy.pause.resume', () async {
      {
        var count = 0;
        late StreamSubscription<void> subscription;

        subscription = Stream.fromIterable([1, 2, 3, 4])
            .groupBy((value) => value)
            .listen((result) {
          count++;

          if (count == 4) {
            subscription.cancel();
          }
        });

        subscription
            .pause(Future<void>.delayed(const Duration(milliseconds: 100)));
      }

      {
        var count = 0;
        late StreamSubscription<void> subscription;

        subscription = Stream.fromIterable([1, 2, 3, 4])
            .groupBy(
              (value) => value,
          durationSelector: (_) => Rx.timer(null, const Duration(seconds: 1)),
        )
            .listen((result) {
          count++;

          if (count == 4) {
            subscription.cancel();
          }
        });

        subscription
            .pause(Future<void>.delayed(const Duration(milliseconds: 100)));
      }
    });

    test('Rx.groupBy.error.shouldThrow.onError', () async {
      {
        final streamWithError =
        Stream<void>.error(Exception()).groupBy((value) => value);

        streamWithError.listen(null,
            onError: (Object e, StackTrace s) {
              expect(e);
            });
      }

      {
        final streamWithError = Stream<void>.error(Exception()).groupBy(
              (value) => value,
          durationSelector: (_) => Rx.timer(null, const Duration(seconds: 1)),
        );

        streamWithError.listen(null,
            onError: (Object e, StackTrace s) {
              expect(e);
            });
      }
    });

    test('Rx.groupBy.error.shouldThrow.onGrouper', () async {
      {
        final streamWithError =
        Stream.fromIterable([1, 2, 3, 4]).groupBy((value) {
          throw Exception();
        });

        streamWithError.listen(null,
            onError: (Object e, StackTrace s) {
              expect(e);
            });
      }

      {
        final streamWithError = Stream.fromIterable([1, 2, 3, 4]).groupBy(
              (value) => throw Exception(),
          durationSelector: (_) => Rx.timer(null, const Duration(seconds: 1)),
        );

        streamWithError.listen(null,
            onError: (Object e, StackTrace s) {
              expect(e);
            });
      }
    });
    test('Rx.groupBy accidental broadcast', () async {
      {
        final controller = StreamController<int>();

        final stream = controller.stream.groupBy((_) => _);

        stream.listen(null);
        expect(() => stream.listen(null));

        controller.add(1);
      }

      {
        final controller = StreamController<int>();

        final stream = controller.stream.groupBy(
              (_) => _,
          durationSelector: (_) => Rx.timer(null, const Duration(seconds: 1)),
        );

        stream.listen(null);
        expect(() => stream.listen(null));

        controller.add(1);
      }
    });

    test('Rx.groupBy.durationSelector', () {
      final g = [
        '0 -> 1',
        '1 -> 1',
        '2 -> 1',
        '0 -> 2',
        '1 -> 2',
        '2 -> 2',
      ];
      final take = 30;

      final stream = Stream.periodic(const Duration(milliseconds: 100), (i) => i)
          .groupBy(
            (i) => i % 3,
        durationSelector: (i) =>
            Rx.timer(null, const Duration(milliseconds: 400)),
      )
          .flatMap((g) => g
          .scan<int>((acc, value, index) => acc + 1, 0)
          .map((event) => '${g.key} -> $event'))
          .take(take);

      expect(
        stream,
      );
    });

    test('Rx.groupBy.nullable', () {
      nullableTest<GroupedStream<String?, String?>>(
            (s) => s.groupBy((v) => v),
      );
    });
  }

}